package com.changge.common.mq.service.impl;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.changge.common.mq.service.IConsumerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;

/**
 * RabbitMQ消费者操作接口实现类
 *
 * @author zhangrongkang
 * @since 2023/10/24
 */
@Slf4j
public abstract class AbstractConsumer<T> implements IConsumerService {

    private Class<T> clazz;

    /**
     * 消息
     */
    private Message message;

    /**
     * 通道
     */
    private Channel channel;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        this.message = message;
        this.channel = channel;
        Type type = getClass().getGenericSuperclass();
        ParameterizedType parameterizedType = (ParameterizedType) type;
        this.clazz = (Class<T>) parameterizedType.getActualTypeArguments()[0];
        // 获取消息体
        String body = new String(message.getBody());
        // 对外暴露消费者处理方法
        onConsumer(getObject(body));
    }

    /**
     * 扩展消费方法
     *
     * @param data 消息体
     * @throws IOException IO异常
     */
    public abstract void onConsumer(T data) throws Exception;

    /**
     * 获取对应泛型
     *
     * @param body 消息体
     * @return 消息体对应泛型
     */
    private T getObject(String body) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
        try {
            // 使用FastJson将消息转为对应类
            return JSON.parseObject(body, clazz);
        } catch (Exception e) {
            log.error("RabbitMQ转发层发生错误，请检查泛型是否与实际类型匹配, 指定的泛型是: {}", clazz.getName(), e);
        }
        return clazz.getDeclaredConstructor().newInstance();
    }

    /**
     * 确认消息
     */
    protected void ack() throws IOException {
        ack(Boolean.FALSE);
    }

    /**
     * 拒绝消息
     */
    protected void nack() throws IOException {
        nack(Boolean.FALSE, Boolean.FALSE);
    }

    /**
     * 拒绝消息
     */
    protected void basicReject() throws IOException {
        basicReject(Boolean.FALSE);
    }

    /**
     * 拒绝消息
     * <p>
     *   当前 DeliveryTag 的消息是否确认所有
     * </p>
     *
     * @param multiple true 是， false 否
     */
    protected void basicReject(Boolean multiple) throws IOException {
        this.channel.basicReject(this.message.getMessageProperties().getDeliveryTag(), multiple);
    }

    /**
     * 是否自动确认
     * <p>
     *   当前 DeliveryTag 的消息是否确认
     * </p>
     *
     * @param multiple true 是， false 否
     */
    protected void ack(Boolean multiple) throws IOException {
        this.channel.basicAck(this.message.getMessageProperties().getDeliveryTag(), multiple);
    }

    /**
     * 拒绝消息
     *
     * @param multiple 当前 DeliveryTag 的消息是否确认 true 是， false 否
     * @param requeue 当前 DeliveryTag 消息是否重回队列 true 是 false 否
     */
    protected void nack(Boolean multiple, Boolean requeue) throws IOException {
        this.channel.basicNack(this.message.getMessageProperties().getDeliveryTag(), multiple, requeue);
    }

}
